package com.itqiqi.api.window;

import com.itqiqi.api.pojo.SensorReading;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import scala.Tuple2;

public class WindowTest2_CountWindow {

    public static void main(String[] args) throws Exception {
        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 使数据有顺序
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.readTextFile("input/sensor.txt");

        // 转换成SensorReading类型
        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map((MapFunction<String, SensorReading>) s -> {
            String[] fields = s.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 计数开窗测试
        SingleOutputStreamOperator<Double> resStream = dataStream.keyBy("id")
                .countWindow(10, 2)
                .aggregate(new AggregateFunction<SensorReading, Tuple2<Double, Integer>, Double>() {
                    @Override
                    public Tuple2<Double, Integer> createAccumulator() {
                        return new Tuple2<>(0.0, 0);
                    }

                    @Override
                    public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> doubleIntegerTuple2) {
                        return new Tuple2<>(sensorReading.getTemperature() + doubleIntegerTuple2._1, doubleIntegerTuple2._2 + 1);
                    }

                    @Override
                    public Double getResult(Tuple2<Double, Integer> doubleIntegerTuple2) {
                        return doubleIntegerTuple2._2 / doubleIntegerTuple2._1;
                    }

                    @Override
                    public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> doubleIntegerTuple2, Tuple2<Double, Integer> acc1) {
                        return new Tuple2<>(doubleIntegerTuple2._1 + acc1._1, doubleIntegerTuple2._2 + acc1._2);
                    }
                });

        resStream.print();

        env.execute();
    }

}
